/*low level file system*/

#include "config.h"

#include <dirent.h>
#include <sys/types.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <time.h>

#define DBG_SUBSYS S_LIBSTORAGE

/*low level file system*/

#include "lsv.h"
#include "sysy_lib.h"
#include "cache.h"
#include "chunk.h"
#include "vnode.h"
#include "lichstor.h"
#include "system.h"
#include "etcd.h"
#include "rmvol_bh.h"
#include "rmsnap_bh.h"
#include "lich_md.h"
#include "md_map.h"
#include "lease.h"
#include "../replica/replica.h"
#include "net_global.h"
#include "main_loop.h"
#include "dbg.h"
#include "volume_proto_readcache.h"
#include "local_vol.h"
#include "stor_root.h"
#include "core.h"
#include "storage_status.h"

typedef struct {
        volid_t volid;
        size_t size;
        off_t offset;
        buffer_t *buf;

        chkid_t chkid;
        int retval;
        nid_t *dist;
        int dist_count;
} core_ctx_t;

int stor_rootid(chkid_t *rootid, const char *pool)
{
        int ret;

        ret = md_root(rootid, pool);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        YASSERT(!chkid_isnull(rootid));
        
        return 0;
err_ret:
        return _errno(ret);
}

static int __stor_lookup_parent(const fileid_t *fileid, fileid_t *parent)
{
        int ret;
        nid_t nid;

        ret = md_map_getsrv(fileid, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = md_getparent(&nid, fileid, parent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return _errno(ret);
}

int stor_lookup(const char *pool, const fileid_t *parent, const char *name, fileid_t *fileid)
{
        int ret;
        chkinfo_t *chkinfo;
        char buf[CHKINFO_MAX];

        if (name[0] == '\0') {
                ret = EINVAL;
                GOTO(err_ret, ret);
        }

        if (strcmp(name, "..") == 0) {
                return  __stor_lookup_parent(parent, fileid);
        }

        chkinfo = (void *)buf;
        ret = md_lookup_byname(pool, parent, name, chkinfo, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        *fileid = chkinfo->id;

        return 0;
err_ret:
        return _errno(ret);
}

int stor_lookup1(const char *pool, const char *path, fileid_t *id)
{
        int ret;
        char parent[MAX_NAME_LEN], name[MAX_NAME_LEN];
        fileid_t parentid;

        YASSERT(strcmp(path, "//"));

        DBUG("pool %s path %s\n", pool, path);
        
        if (strcmp(path, "/") == 0) {
                ret = stor_rootid(id, pool);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = _path_split2(path, parent, name);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                ret = stor_lookup1(pool, parent, &parentid);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }

                ret = stor_lookup(pool, &parentid, name, id);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return _errno(ret);
}

int stor_lookup1x(const char *pool, const char *path, fileid_t *id)
{
        int ret, count;
        char tmp[MAX_GROUP_NAME_LEN];
        char *list[2];

        count = 2;
        strcpy(tmp, path);
        _str_split(tmp, '@', list, &count);

        if (count != 2) {
                return stor_lookup1(pool, path, id);
        } else {
                fileid_t parentid;

                ret = stor_lookup1(pool, list[0], &parentid);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }

                ret = stor_lookup(pool, &parentid, list[1], id);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return ret;
}

int stor_mkvol_sync2master(const char *name, const chkid_t *chkid)
{
        int ret;
        char buf[LOCAL_VOL_SIZE];
        local_vol_t *local_vol;

        local_vol = (void *)buf;
        local_vol->volcount = 1;
        local_vol->vols[0] = *chkid;

        ret = dispatch_vol_notifiction(name, local_vol, LOCAL_VOL_ADD);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

static int __stor_mkvol_replia_ajust(uint32_t *_repnum)
{
        int ret;
        int total, online, repnum = *_repnum;
        
        if (!cluster_is_solomode()) {
                ret = conn_faultdomain(&total, &online);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                if (total < LICH_REPLICA_MIN + 1) {
                        ret = ENOSPC;
                        GOTO(err_ret, ret);
                }

                if (repnum + 1 > total) {
                        *_repnum = total - 1;
                }
        }

        return 0;
err_ret:
        return ret;
}

int stor_mkvol_with_area(fileid_t *_fileid, const fileid_t *parent, vol_param_t *param)
{
        int ret;
        setattr_t setattr, *tmp;
        chkinfo_t *chkinfo;
        char buf[CHKINFO_MAX];
        fileinfo_t fileinfo;

        if (strlen(param->name) == 0) {
                ret = EINVAL;
                GOTO(err_ret, ret);
        }

        chkinfo = (void *)buf;

        if (param->setattr) {
                tmp = (void *)param->setattr;
        } else {
                tmp = &setattr;
                md_initattr(tmp, __S_IFREG, 0);

                ret = md_getattr(parent, &fileinfo);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }

                md_info2attr(tmp, &fileinfo);

                if (tmp->replica.set_it) {
                        ret = __stor_mkvol_replia_ajust(&tmp->replica.val);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);
                }
        }

        if (param->priority != -1) {
                tmp->priority.set_it = 1;
                tmp->priority.val = param->priority;
        }

        if (param->repnum >= LICH_REPLICA_MIN && param->repnum <= LICH_REPLICA_MAX) {
                tmp->replica.set_it = 1;
                tmp->replica.val = param->repnum;
        }

#if LSV
        if (param->volume_format == VOLUME_FORMAT_ROW2) {
                tmp->lsv.set_it = 1;
                tmp->lsv.val = VOLUME_FORMAT_ROW2;
        } else if (param->volume_format == VOLUME_FORMAT_ROW3) {
                tmp->lsv.set_it = 1;
                tmp->lsv.val = VOLUME_FORMAT_ROW3;
        } else if (param->volume_format == VOLUME_FORMAT_LSV) {
                tmp->lsv.set_it = 1;
                tmp->lsv.val = VOLUME_FORMAT_LSV;
        }

        if (param->size > 0) {
                tmp->size.set_it = __SET_LOGICAL_SIZE;
                tmp->size.size = param->size;
        }

#else
        if (param->size > 0) {
                tmp->size.set_it = __SET_PHYSICAL_SIZE;
                tmp->size.size = param->size;
        }
#endif

        ret = md_mkvol(parent, param->name, param->site, tmp, chkinfo);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (_fileid)
                *_fileid = chkinfo->id;

        return 0;
err_ret:
        return _errno(ret);
}

int stor_mkvol(const fileid_t *parent, const char *name, const setattr_t *_setattr, fileid_t *_fileid)
{
        int ret;
        vol_param_t param;

        vol_param_init(&param);

        param.setattr = (void *)_setattr;
        strcpy(param.name, name);

        ret = stor_mkvol_with_area(_fileid, parent, &param);
        if (ret)
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int stor_getpool(const chkid_t *id, char *pool)
{
        int ret;

        ret = md_getpool(id, pool);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return _errno(ret);
}

int stor_reset(const char *pool, const chkid_t *id)
{
        return vnode_reset(pool, id);
}

int stor_getattr(const char *pool, const chkid_t *id, struct stat *stbuf)
{
        int ret;
        fileinfo_t fileinfo;

        /**
         * 分布式缓存一致性：vnode有缓存， 获取最新值可能有时延
         * 在要求严格一致性的情况下(如尽快感知到扩容），需要使缓存失效，重新加载
         */
        ret = vnode_getattr(pool, id, &fileinfo, 1);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        md_info2stat(stbuf, &fileinfo);

        DBUG(""CHKID_FORMAT" mode %x, size %ju\n",
             CHKID_ARG(id), stbuf->st_mode, stbuf->st_size);

        return 0;
err_ret:
        return _errno(ret);
}

int stor_getsnapversion(const char *pool, const chkid_t *id, uint64_t *snap_version)
{
        int ret;
        fileinfo_t fileinfo;

        ret = vnode_getattr(pool, id, &fileinfo, 1);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        *snap_version = fileinfo.snap_version;

        return 0;
err_ret:
        return _errno(ret);
}

int stor_mkpool_with_area(const fileid_t *parent, const char *name, ec_t *ec,
                          const char *site_name, const setattr_t *_setattr, fileid_t *_fileid)
{
        int ret;
        setattr_t setattr, *tmp;
        chkinfo_t *chkinfo;
        char buf[CHKINFO_MAX];
        fileinfo_t fileinfo;

        if (strlen(name) == 0) {
                ret = EINVAL;
                GOTO(err_ret, ret);
        }

        ret = md_getattr(parent, &fileinfo);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (_setattr) {
                tmp = (void *)_setattr;
                md_info2attr(tmp, &fileinfo);
        } else {
                tmp = &setattr;
                md_initattr(tmp, __S_IFDIR, 0);
                md_info2attr(tmp, &fileinfo);
        }

        if (ec && ec->plugin != PLUGIN_NULL) {
                tmp->ec.set_it = 1;
                tmp->ec.ec.plugin = ec->plugin;
                tmp->ec.ec.tech = ec->tech;
                tmp->ec.ec.m = ec->m;
                tmp->ec.ec.k = ec->k;
                tmp->ec.ec.log_size = ec->log_size;
                tmp->ec.ec.data_size = ec->data_size;
        }

        chkinfo = (void *)buf;
        ret = md_mkpool_with_area(parent, name, site_name, tmp, chkinfo);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (_fileid)
                *_fileid = chkinfo->id;

        return 0;
err_ret:
        return _errno(ret);
}

int stor_mkpool(const fileid_t *parent, const char *name, const setattr_t *_setattr, fileid_t *_fileid)
{
        return stor_mkpool_with_area(parent, name, NULL, "", _setattr, _fileid);
}

int stor_listpool_open(const fileid_t *fileid, const char *uuid)
{
        int ret;

        ret = md_listpool_open(fileid, uuid);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int stor_listpool(const fileid_t *fileid, const char *uuid, uint64_t offset, void **_de, int *_delen)
{
        int ret, delen;
        void *de;

        ret = ymalloc((void **)&de, BIG_BUF_LEN);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        DBUG("listpool "CHKID_FORMAT" offset %llu\n", CHKID_ARG(fileid), (LLU)offset);

        delen = BIG_BUF_LEN;
        memset(de, 0, delen);

        ret = md_listpool(fileid, uuid, offset, de, &delen);
        if (unlikely(ret))
                GOTO(err_free, ret);

        if (delen) {
                *_de = de;
                *_delen = delen;
        } else {
                yfree((void **)&de);
                *_delen = 0;
                *_de = NULL;
        }

        return 0;
err_free:
        yfree((void **)&de);
err_ret:
        return _errno(ret);
}


int stor_listpool_close(const fileid_t *fileid, const char *uuid)
{
        int ret;

        ret = md_listpool_close(fileid, uuid);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int stor_listpoolplus(const fileid_t *fileid, off_t offset, void **de, int *delen)
{
        if (offset == 2147483647) {  /* 2GB - 1 */
                *delen = 0;
                return 0;
        }

        UNIMPLEMENTED(__DUMP__);

        (void) offset;
        (void) de;
        (void) delen;
        (void) fileid;

        return 0;
}

int stor_chmod(const char *pool, const fileid_t *fileid, mode_t mode)
{
        return _errno(vnode_chmod(pool, fileid, mode));
}

int stor_chown(const char *pool, const fileid_t *fileid, uid_t owner, gid_t group)
{
        return _errno(vnode_chown(pool, fileid, owner, group));
}

int stor_utime(const char *pool, const fileid_t *fileid, uint32_t atime, uint32_t mtime)
{
        return _errno(vnode_utime(pool, fileid, atime, mtime));
}

int stor_truncate(const char *pool, const fileid_t *fileid, uint64_t length)
{
        return _errno(volume_truncate(pool, fileid, length));
}

int stor_rmvol_sync2master(const char *name, const chkid_t *chkid)
{
        int ret;
        char buf[LOCAL_VOL_SIZE];
        local_vol_t *local_vol;

        local_vol = (void *)buf;
        local_vol->volcount = 1;
        local_vol->vols[0] = *chkid;

        ret = dispatch_vol_notifiction(name, local_vol, LOCAL_VOL_DEL);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int stor_rmvol(const fileid_t *parent, const char *name, int force)
{
        int ret, empty = 0;
        char _chkinfo[CHKINFO_MAX];
        chkinfo_t *chkinfo;

        chkinfo = (void *)_chkinfo;
        ret = md_lookup_byname1(parent, name, chkinfo, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (!force) {
                ret = md_snapshot_isempty(&chkinfo->id, &empty);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                if (!empty) {
                        ret = EPERM;
                        DWARN("There are snapshots on this volume. No deletion is allowed !\n");
                        GOTO(err_ret, ret);
                }
        }

        ret = md_rmvol(parent, name, force);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

         return 0;
err_ret:
         return _errno(ret);
}

int stor_rmpool(const char *pool, const fileid_t *parent, const char *name)
{
        int ret, empty;
        chkinfo_t *chkinfo;
        char buf[CHKINFO_MAX];

        chkinfo = (void *)buf;
        ret = md_lookup_byname(pool, parent, name, chkinfo, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (chkinfo->id.type != __POOL_CHUNK__) {
                ret = ENOTDIR;
                GOTO(err_ret, ret);
        }

        ret = md_isempty(&chkinfo->id, &empty);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (empty) {
                ret = md_rmpool(parent, name);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = ENOTEMPTY;
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return _errno(ret);
}

int stor_statvfs(const char *pool, const fileid_t *fileid, struct statvfs *vfs)
{
        int ret, buflen, done = 0;
        char buf[MAX_BUF_LEN], tmp[MAX_BUF_LEN], uuid[MAX_NAME_LEN] = {};
        uint64_t offset = 0, offset2 = 0, used = 0, total = 0;
        struct dirent *de;
        nodeinfo_t info;
        static struct statvfs prev;
        static time_t prev_load = 0;
        time_t now = 0;
        uuid_t _uuid;

        (void) fileid;

        /*DINFO("fsstat, home %s\n", ng.home);*/

        now = gettime();
        if (now - prev_load < 60) {
                /*DINFO("use prev: after: 60s\n");*/
        } else {

                uuid_generate(_uuid);
                uuid_unparse(_uuid, uuid);

                ret = cluster_listnode_open(uuid);
                if (ret)
                        GOTO(err_ret, ret);

                while (done == 0) {
                        memset(buf, 0, sizeof(buf));
                        ret = cluster_listnode(buf, &buflen, uuid, offset);
                        if (unlikely(ret)) {
                                GOTO(err_close, ret);
                        } else if (buflen == 0)
                                break;

                        offset2 = 0;
                        dir_for_each(buf, buflen, de, offset2) {
                                if (strlen(de->d_name) == 0) {
                                        done = 1;
                                        break;
                                } else if (buflen - offset2 < sizeof(*de) + MAX_NAME_LEN)
                                        break;

                                offset += de->d_reclen;

                                /*DINFO("%s\n", de->d_name);*/
                                ret = node_getinfo(&info, de->d_name, tmp);
                                if (unlikely(ret)) {
                                        // TODO core ret == 64 ENONET
                                        DWARN("%s: %s (%d)\n", de->d_name, strerror(ret), ret);
                                } else {
                                        diskmd_pool_dfree(pool, &total, &used);
                                }
                        }
                }

                memset(&prev, 0x0, sizeof(prev));
                prev.f_bsize = 4096;
                prev.f_frsize = 4096;

                prev.f_blocks = total/(prev.f_bsize);
                prev.f_bfree = (total - used)/(prev.f_bsize);
                prev.f_bavail = (total - used)/(prev.f_bsize);

                prev.f_files = prev.f_blocks;
                prev.f_ffree = prev.f_bfree;
                prev.f_favail = prev.f_bavail;

                /*prev.f_fsid = fileid;*/
                prev.f_flag = 1024;
                prev.f_namemax = 255;

                        /*DINFO("total: %llu: used: %llu\n", (LLU)total, (LLU)used);*/

                cluster_listnode_close(uuid);
        }

        *vfs = prev;
        prev_load = now;

        return 0;
err_close:
        cluster_listnode_close(uuid);
err_ret:
        return _errno(ret);
}

int stor_isempty(const fileid_t *fileid, int *empty)
{
        return _errno(vnode_isempty(fileid, empty));
}

int stor_splitpath(const char *pool, const char *path, fileid_t *parentid, char *name)
{
        int ret;
        char parent[MAX_PATH_LEN];

        ret = _path_split2(path, parent, name);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = stor_lookup1(pool, parent, parentid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        DINFO("name %s parent %s("CHKID_FORMAT")\n", name, parent,
              CHKID_ARG(parentid));

        return 0;
err_ret:
        return _errno(ret);
}

static int __stor_cap(const char *pool, const fileid_t *fromid, const fileid_t *toid)
{
        int ret;
        struct stat frombuf, tobuf;

        ret = stor_getattr(pool, fromid, &frombuf);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = stor_getattr(pool, toid, &tobuf);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if ((S_ISREG(frombuf.st_mode) && S_ISREG(tobuf.st_mode))
            || (S_ISDIR(frombuf.st_mode) && S_ISDIR(tobuf.st_mode))) {
                    goto out;
        } else {
                    ret = EEXIST;
                    GOTO(err_ret, ret);
        }

out:
        return 0;
err_ret:
        return _errno(ret);
}  

int stor_rename(const char *pool, const fileid_t *fromdir, const char *fromname,
                const fileid_t *todir, const char *toname)
{
        int ret, empty;
        fileid_t fromid, toid;

        ret = stor_lookup(pool, todir, toname, &toid);
        if (unlikely(ret)) {
                if (ret == ENOENT) {
                        //not exist;
                } else
                        GOTO(err_ret, ret);
        } else {
                ret = EEXIST;
                GOTO(err_ret, ret);

                ret = stor_isempty(&toid, &empty);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                if (!empty) {
                        ret = EEXIST;
                        GOTO(err_ret, ret);
                }

                ret = stor_lookup(pool, fromdir, fromname, &fromid);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                ret = __stor_cap(pool, &fromid, &toid);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                ret = stor_rmvol(todir, toname, 0);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        ret = md_rename(fromdir, fromname, todir, toname);
        if (unlikely(ret)) {
                DINFO("rename from "CHKID_FORMAT" %s to "CHKID_FORMAT" %s\n",
                      CHKID_ARG(fromdir), fromname, CHKID_ARG(todir), toname);
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return _errno(ret);
}

int stor_set_repnum(const char *pool, const fileid_t *id, int repnum)
{
        int ret;
        fileinfo_t fileinfo;

        if (repnum < 1 || repnum > LICH_REPLICA_MAX) {
                ret = EINVAL;
                GOTO(err_ret, ret);
        }

        ret = vnode_getattr(pool, id, &fileinfo, 1);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (fileinfo.ec.plugin == PLGUIN_EC_ISA && id->type == __VOLUME_CHUNK__) {
                ret = EPERM;
                GOTO(err_ret, ret);
        }

        ret = vnode_set_repnum(pool, id, repnum);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return _errno(ret);
}

static int __stor_chunk_loctaion(const char *pool, const fileid_t *oid, char *buf)
{
        int ret, i;
        char _chkinfo[CHKINFO_MAX], _netinfo[MAX_BUF_LEN], name[MAX_NAME_LEN],
                tmp1[MAX_PATH_LEN],
                tbuf[MAX_NAME_LEN];
        chkinfo_t *chkinfo;
        nodeinfo_t *info;
        reploc_t *reploc;
        time_t t;
        struct tm tmt;

        chkinfo = (void *)_chkinfo;
        ret = md_chunk_getinfo(pool, NULL, oid, chkinfo, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        t = chkinfo->mtime;
        strftime(tbuf, 128, "%F %T", localtime_safe(&t, &tmt));
        snprintf(buf, MAX_PATH_LEN, "info_version %llu, last move %s,"
                 " repnum: %u\n",
                 (LLU)chkinfo->info_version, tbuf, chkinfo->repnum);

        info = (void *)_netinfo;
        for (i = 0; i < (int)chkinfo->repnum; i++) {
                reploc = &chkinfo->diskid[i];

                ret = network_connect1(&reploc->id);
                if (unlikely(ret)) {
			sprintf(buf + strlen(buf), "\x1b[1;31m  *%s : offline\x1b[0m\n", network_rname(&reploc->id));
                        continue;
                }

                ret = netable_getname(&reploc->id, name);
                if (unlikely(ret)) {
			sprintf(buf + strlen(buf), "\x1b[1;31m  *%s : offline\x1b[0m\n", network_rname(&reploc->id));
                        continue;
                }

                char infobuf[MAX_BUF_LEN];
                ret = node_getinfo(info, name, infobuf);
                if (unlikely(ret)) {
			sprintf(buf + strlen(buf), "\x1b[1;31m  *%s : offline\x1b[0m\n", network_rname(&reploc->id));
                        continue;
                }

                sprintf(tmp1, "%s : "CHKID_FORMAT"",
                        network_rname(&reploc->id), CHKID_ARG(oid));

                if (reploc->status ==  __S_DIRTY)
                        sprintf(buf + strlen(buf), "\x1b[1;31m  *%s, dirty\x1b[0m\n", tmp1);
                else if (reploc->status ==  __S_CHECK)
                        sprintf(buf + strlen(buf), "\x1b[1;31m  *%s, check\x1b[0m\n", tmp1);
                else
                        sprintf(buf + strlen(buf), "  *%s\n", tmp1);
        }

        return 0;
err_ret:
        return _errno(ret);
}

static int __stor_statbyidb_verbose(const char *pool, const fileid_t *id, char *buf)
{
#if 1
        int ret, retry = 0;
        chkinfo_t *chkinfo;
        char _chkinfo[CHKINFO_MAX];

        chkinfo = (void *)_chkinfo;
        
retry:
        ret = md_chunk_getinfo(pool, NULL, id, chkinfo, NULL);
        if (unlikely(ret)) {
                ret = _errno(ret);
                if (ret == EAGAIN) {
                        USLEEP_RETRY(err_ret, ret, retry, retry, 50, (100 * 1000));
                } else
                        GOTO(err_ret, ret);
        }

        chkinfo_dump(chkinfo, NULL, buf, 1);
        
#else
        snprintf(buf, MAX_BUF_LEN,
                 "id:"CHKID_FORMAT"\n",
                 CHKID_ARG(id));

retry:
        ret = __stor_chunk_loctaion(pool, id, buf + strlen(buf));
        if (unlikely(ret)) {
                if (ret == EAGAIN || ret == ENONET || ret == ENOSYS || ret == EBUSY) {
                        USLEEP_RETRY(err_ret, ret, retry, retry, 50, (100 * 1000));
                } else 
                        GOTO(err_ret, ret);
        }

#endif
        return 0;
err_ret:
        return _errno(ret);
}

static int __stor_statbyidb_simple(const char *pool, const chkid_t *id, char *buf)
{
        int ret;
        char tmp[MAX_NAME_LEN];
        chkinfo_t *chkinfo;
        fileinfo_t fileinfo;

        chkinfo = (void *)tmp;
        ret = md_chunk_getinfo(pool, NULL, id, chkinfo, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = md_getattr(id, &fileinfo);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        snprintf(buf, MAX_BUF_LEN,
                 "id:"CHKID_FORMAT"\n"
                 "size:%llu\n"
                 "repnum:%llu\n",
                 CHKID_ARG(id),
                 (LLU)fileinfo.size,
                 (LLU)chkinfo->repnum);

        return 0;
err_ret:
        return _errno(ret);
}

int stor_stat(const char *pool, const chkid_t *id, char *buf, int verbose)
{
        if (verbose)
                return _errno(__stor_statbyidb_verbose(pool, id, buf));
        else
                return _errno(__stor_statbyidb_simple(pool, id, buf));
}

int stor_stat_file(const char *pool, const fileid_t *id)
{
        int ret;
        char buf[MAX_BUF_LEN];
        uint64_t i, chknum;
        chkid_t chkid;
        fileinfo_t fileinfo;

        ret = md_getattr(id, &fileinfo);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        // TODO if vol is large
        chknum = size2chknum(fileinfo.size, &fileinfo.ec);
        for (i = 0; i < chknum; i++) {
                fid2cid(&chkid, id, i);
                ret = __stor_statbyidb_verbose(pool, &chkid, buf);
                if (unlikely(ret)) {
                        if (ret == ENOENT || ret == ENOKEY)
                                continue;
                        else
                                GOTO(err_ret, ret);
                }

                printf("chunk[%llu]: %s\n", (LLU)i, buf);
        }

        return 0;
err_ret:
        return _errno(ret);
}

int stor_init(const char *home, uint64_t  max_chunk)
{
        int ret;
        char path[MAX_PATH_LEN];

        ANALYSIS_BEGIN(0);

        YASSERT(gloconf.metadata_replica >= 2 && gloconf.metadata_replica <= 4);
        
        if (max_chunk == (uint64_t)-1) {
        } else {
                snprintf(path, MAX_NAME_LEN, "%s/chunk", home);
                ret = replica_srv_init(path, max_chunk);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                ret = replica_rpc_init();
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        ANALYSIS_END(0, 1000 * 100, NULL);
        
        main_loop_start();

        ret = stor_root_init();
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = md_init();
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = volume_init();
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = system_init();
        if (unlikely(ret))
                GOTO(err_ret, ret);

#if 0
        ret = rmvol_bh_init();
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = rollback_bh_init();
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = rmsnap_bh_init();
        if (unlikely(ret))
                GOTO(err_ret, ret);
#endif

#if ENABLE_REDIS_CACHE
        if (gloconf.redis_cache) {
                ret = readcache_init(gloconf.redis_sock);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }
#endif

        ret = storage_status_bh_init();
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return _errno(ret);
}

void stor_destroy()
{
        replica_srv_destroy();
}

int IO_FUNC stor_read(const char *pool, const fileid_t *id, buffer_t *_buf, size_t _size, off_t offset)
{
        io_t io;

        io_init(&io, id, NULL, offset, _size, 0);
        return _errno(volume_read(pool, &io, _buf, NULL));
}

int stor_write(const char *pool, const fileid_t *id, const buffer_t *_buf, size_t _size, off_t offset)
{
        io_t io;

        io_init(&io, id, NULL, offset, _size, 0);
        return _errno(volume_write(pool, &io, _buf, NULL));
}

int stor_location(const char *pool, const fileid_t *id, nid_t *nid)
{
        return  _errno(vnode_location(pool, id, nid));
}

int stor_read_withnid(const char *pool, const fileid_t *id, buffer_t *_buf, size_t _size, off_t offset, nid_t *nid)
{
        io_t io;

        io_init(&io, id, NULL, offset, _size, 0);
        return _errno(volume_read(pool, &io, _buf, nid));
}

int stor_write_withnid(const char *pool, const fileid_t *id, const buffer_t *_buf, size_t _size, off_t offset, nid_t *nid)
{
        io_t io;

        io_init(&io, id, NULL, offset, _size, 0);
        return _errno(volume_write(pool, &io, _buf, nid));
}

STATIC void __stor_write_remote(void *_core_ctx)
{
        int ret;
        core_ctx_t *core_ctx = _core_ctx;
        
        ret = stor_write(NULL, &core_ctx->volid, core_ctx->buf, core_ctx->size, core_ctx->offset);
        if (ret)
                GOTO(err_ret, ret);

        core_ctx->retval = 0;
        return;
err_ret:
        core_ctx->retval = ret;
        return;
}

int stor_write_remote(const volid_t *volid, const buffer_t *buf, size_t size, off_t offset)
{
        int ret;
        core_ctx_t core_ctx;

        core_ctx.volid = *volid;
        core_ctx.size = size;
        core_ctx.offset = offset;
        core_ctx.buf = (buffer_t *)buf;
        core_ctx.retval = 0;

        ret = core_request0(core_hash(volid), __stor_write_remote,
                        &core_ctx, "write_remote");
        if (ret)
                GOTO(err_ret, ret);

        ret = core_ctx.retval;
        if (ret)
                GOTO(err_ret, ret);

        return 0;

err_ret:
        return ret;
}

STATIC void __stor_read_remote(void *_core_ctx)
{
        int ret;
        core_ctx_t *core_ctx = _core_ctx;
        
        ret = stor_read(NULL, &core_ctx->volid, core_ctx->buf, core_ctx->size, core_ctx->offset);
        if (ret)
                GOTO(err_ret, ret);

        core_ctx->retval = 0;
        return;
err_ret:
        core_ctx->retval = ret;
        return;
}

int stor_read_remote(const volid_t *volid, const buffer_t *buf, size_t size, off_t offset)
{
        int ret;
        core_ctx_t core_ctx;

        core_ctx.volid = *volid;
        core_ctx.size = size;
        core_ctx.offset = offset;
        core_ctx.buf = (buffer_t *)buf;
        core_ctx.retval = 0;

        ret = core_request0(core_hash(volid), __stor_read_remote,
                        &core_ctx, "read_remote");
        if (ret)
                GOTO(err_ret, ret);

        ret = core_ctx.retval;
        if (ret)
                GOTO(err_ret, ret);

        return 0;

err_ret:
        return ret;
}

int stor_pwrite(const char *pool, const fileid_t *id, const char *_buf, size_t size, off_t offset)
{
        return volume_pwrite(pool, id, _buf, size, offset);
}

int stor_pread(const char *pool, const fileid_t *id, char *_buf, size_t size, off_t offset)
{
        return volume_pread(pool, id, _buf, size, offset);
}

int stor_unmap(const char *pool, const fileid_t *id, size_t _size, off_t offset)
{
        io_t io;

        DINFO("stor_unmap %ju, %d\r\n", offset, (uint32_t)_size);
        io_init(&io, id, NULL, offset, _size, 0);
        return _errno(volume_unmap(pool, &io));
}

static int __stor_move_md(const char *pool, const fileid_t *fileid, const nid_t *dist)
{
        int ret, idx, i;
        chkinfo_t *chkinfo;
        char _chkinfo[CHKINFO_MAX];
        nid_t nid[LICH_REPLICA_MAX];

        chkinfo = (void *)_chkinfo;
        ret = md_chunk_getinfo(pool, NULL, fileid, chkinfo, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        idx = -1;
        for (i = 0; i < chkinfo->repnum; i++) {
                if (nid_cmp(&chkinfo->diskid[i].id, dist) == 0) {
                        idx = i;
                }

                nid[i] = chkinfo->diskid[i].id;
        }

        if (idx == -1 || idx == 0) {
                nid[0] = *dist;
        } else {
                nid[idx] = nid[0];
                nid[0] = *dist;
        }

        metadata_check_diskid(nid, chkinfo->repnum);

        ret = md_move(&chkinfo->diskid[0].id, fileid, nid, chkinfo->repnum);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

static int __stor_getlocalnid(nid_t *nid)
{
        int ret;
        char path[MAX_PATH_LEN], tmp[MAX_NAME_LEN];

        snprintf(path, MAX_NAME_LEN, "%s/data/node/config/nid", gloconf.home);

        ret = _get_text(path, tmp, MAX_NAME_LEN);
        if (ret < 0) {
                ret = -ret;
                GOTO(err_ret, ret);
        }

        str2nid(nid, tmp);

        return 0;
err_ret:
        return ret;
}

int stor_localize(const char *pool, const fileid_t *fileid)
{
        int ret;
        fileinfo_t fileinfo;
        nid_t nid;

        if (fileid->type != __VOLUME_CHUNK__) {
                ret = EINVAL;
                GOTO(err_ret, ret);
        }

        ret = vnode_getattr(pool, fileid, &fileinfo, 1);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if ((fileinfo.attr & __FILE_ATTR_MULTPATH__) ||
                        !(fileinfo.attr & __FILE_ATTR_LOCALIZE__)) {
                goto out;
        }

        nid = *net_getnid();
        if (net_isnull(&nid)) {
                ret = __stor_getlocalnid(&nid);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                ret = network_connect1(&nid);
                if (unlikely(ret)) {
                        ret = ENOSPC;
                        GOTO(err_ret, ret);
                }
        }

        ret = __stor_move_md(pool, fileid, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

out:
        return 0;
err_ret:
        return ret;
}

int stor_get_location(const char *pool, const fileid_t *fileid, char *buf, diskid_t *_diskid)
{
        int ret;
        char _chkinfo[CHKINFO_MAX];
        chkinfo_t *chkinfo;
        diskid_t *diskid;

        chkinfo = (void *)_chkinfo;
        ret = md_chunk_getinfo(pool, NULL, fileid, chkinfo, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        diskid = &chkinfo->diskid[0].id;
        ret = network_connect1(diskid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        strcpy(buf, network_rname(diskid));
        *_diskid = *diskid;

        return 0;
err_ret:
        return _errno(ret);
}

int stor_snapshot_create(const fileid_t *parent, const char *name, int p, const char *_site)
{
        int ret;

        if (strlen(name) == 0) {
                ret = EINVAL;
                GOTO(err_ret, ret);
        }

        ret = md_snapshot_create_with_area(parent, name, p, _site);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int stor_snapshot_rollback(const fileid_t *parent, const char *name)
{
        int ret;

        if (strlen(name) == 0) {
                ret = EINVAL;
                GOTO(err_ret, ret);
        }

        ret = md_snapshot_rollback(parent, name);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return _errno(ret);
}

int stor_snapshot_flat(const fileid_t *fileid, const int idx, int force)
{
        int ret;

        ret = md_snapshot_flat(fileid, idx, force);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return _errno(ret);
}

int stor_snapshot_remove(const char *pool, const fileid_t *parent, const char *name, int force)
{
        int ret;
        fileid_t id;
        fileinfo_t parent_fileinfo;
        fileinfo_t fileinfo;

        if (strlen(name) == 0) {
                ret = EINVAL;
                GOTO(err_ret, ret);
        }

        YASSERT(parent->type == __VOLUME_CHUNK__);

        ret = md_getattr(parent, &parent_fileinfo);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        volume_format_t format = lich_volume_format(&parent_fileinfo);
        if (format == VOLUME_FORMAT_RAW) {
                ret = stor_lookup(pool, parent, name, &id);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }

                if (!force) {
                        ret = vnode_getattr(pool, &id, &fileinfo, 1);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);

                        if (fileinfo.attr & __FILE_ATTR_PROTECT__) {
                                ret = EPERM;
                                GOTO(err_ret, ret);
                        }
                }
        } else {
                // nothing to do
        }

        ret = md_snapshot_remove(parent, name, force);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return _errno(ret);
}

int stor_snapshot_protect(const char *pool, const fileid_t *parent, const char *name, snap_protect_param_t on)
{
        int ret;
        fileid_t id;
        fileinfo_t parent_fileinfo;

        ret = md_getattr(parent, &parent_fileinfo);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        volume_format_t format = lich_volume_format(&parent_fileinfo);
        if (format == VOLUME_FORMAT_RAW) {
                ret = stor_lookup(pool, parent, name, &id);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }

                ret = md_snapshot_protect(&id, on);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

        } else {
                // nothing to do
                ret = md_snapshot_protect(parent, on);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return _errno(ret);
}

int stor_snapshot_updateparent(const fileid_t *parent, const char *name, uint64_t from)
{
        int ret;

        ret = md_snapshot_updateparent(parent, name, from);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return _errno(ret);
}

int stor_snapshot_setfrom(const fileid_t *fileid, uint64_t from)
{
        int ret;

        ret = md_snapshot_setfrom(fileid, from);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return _errno(ret);
}

int stor_snapshot_listopen(const fileid_t *fileid, const char *uuid)
{
        return md_snapshot_listopen(fileid, uuid);
}

int stor_snapshot_list(const fileid_t *fileid, const char *uuid, uint64_t offset, void *de, int *delen)
{
        return md_snapshot_list(fileid, uuid, offset, de, delen);
}

int stor_snapshot_listclose(const fileid_t *fileid, const char *uuid)
{
        return md_snapshot_listclose(fileid, uuid);
}

int stor_snapshot_read(const char *pool, const fileid_t *parent, const fileid_t *fileid, buffer_t *_buf, size_t _size, off_t offset)
{
        return _errno(volume_snapshot_read(pool, parent, fileid, _buf, _size, offset, TRUE));
}

int stor_snapshot_diff(const char *pool, const fileid_t *parent, const fileid_t *snapsrc,
                const fileid_t *snapdst, buffer_t *_buf, size_t _size, off_t offset)
{
        return _errno(volume_snapshot_diff(pool, parent, snapsrc, snapdst, _buf, _size, offset));
}

int stor_snapshot_check(const fileid_t *fid, const char *name)
{
        int ret;

        if (strlen(name) == 0) {
                ret = EINVAL;
                GOTO(err_ret, ret);
        }

        ret = md_snapshot_check(fid, name);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return _errno(ret);
}

int stor_group_snapshot_create(const fileid_t *fids, const char *name, int count, const char **_site)
{
        int ret;

        if (strlen(name) == 0) {
                ret = EINVAL;
                GOTO(err_ret, ret);
        }

        ret = md_group_snapshot_create(fids, name, count, _site);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}
